-
Notifications
You must be signed in to change notification settings - Fork 4
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Preserve original field order when merging parquet Fields #213
Conversation
We use the `Migrations.mergeSchema` function in the loaders, for combining a series of schemas (e.g. `1-0-0`, `1-0-1`, `1-0-2`) into a reconciled column Before this PR, `mergeSchemas` contained some logic to preserve field order... but it was the wrong logic. It preserved field order of the newer schema, ignoring whether a field was present in the older schema. After this PR, we preserve field order of the older schema. New fields added in a later schema are appended to the end of the field list. This feature change is needed for loaders that only allow field additions when they are appended to the end of a struct. E.g. the Lake Loader for Hudi/Glue.
See snowplow/schema-ddl#213 When a schema is evolved (e.g. from `1-0-0` to `1-0-1`) we create a merged struct column combining fields from new and old schema. For some loaders it is important that newly-added nested fields come after the original fields. E.g. Lake Loader with Hudi and Glue sync enabled.
See snowplow/schema-ddl#213 When a schema is evolved (e.g. from `1-0-0` to `1-0-1`) we create a merged struct column combining fields from new and old schema. For some loaders it is important that newly-added nested fields come after the original fields. E.g. Lake Loader with Hudi and Glue sync enabled.
This overcomes a limitation with how Hudi syncs schemas to the Glue catalog. Previously, if version `1-0-0` of a schema had fields `a` and `b`, and then vesion `1-0-1` adds a field `c`, then the new field might be added _before_ the original fields in the Hudi schema. The new field would get synced to Glue, but only for new partitions; it is not back-filled to existing partitions. After this change, the new field `c` is added _after_ the original fields `a` and `b` in the Hudi schema. Then there is no need to sync the new field to existing partitions in Glue. The problem manifested in AWS Athena with a message like: > HIVE_PARTITION_SCHEMA_MISMATCH: There is a mismatch between the table and partition schemas. This fix was implemented in snowplow/schema-ddl#213 and imported via a new version of common-streams. This change does not impact Delta or Iceberg, where nothing was broken.
This overcomes a limitation with how Hudi syncs schemas to the Glue catalog. Previously, if version `1-0-0` of a schema had fields `a` and `b`, and then vesion `1-0-1` adds a field `c`, then the new field might be added _before_ the original fields in the Hudi schema. The new field would get synced to Glue, but only for new partitions; it is not back-filled to existing partitions. After this change, the new field `c` is added _after_ the original fields `a` and `b` in the Hudi schema. Then there is no need to sync the new field to existing partitions in Glue. The problem manifested in AWS Athena with a message like: > HIVE_PARTITION_SCHEMA_MISMATCH: There is a mismatch between the table and partition schemas. This fix was implemented in snowplow/schema-ddl#213 and snowplow-incubator/common-streams#98 and imported via a new version of common-streams. This change does not impact Delta or Iceberg, where nothing was broken.
This overcomes a limitation with how Hudi syncs schemas to the Glue catalog. Previously, if version `1-0-0` of a schema had fields `a` and `b`, and then vesion `1-0-1` adds a field `c`, then the new field might be added _before_ the original fields in the Hudi schema. The new field would get synced to Glue, but only for new partitions; it is not back-filled to existing partitions. After this change, the new field `c` is added _after_ the original fields `a` and `b` in the Hudi schema. Then there is no need to sync the new field to existing partitions in Glue. The problem manifested in AWS Athena with a message like: > HIVE_PARTITION_SCHEMA_MISMATCH: There is a mismatch between the table and partition schemas. This fix was implemented in snowplow/schema-ddl#213 and snowplow-incubator/common-streams#98 and imported via a new version of common-streams. This change does not impact Delta or Iceberg, where nothing was broken.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks great !
Field("xxx", Type.String, Required), | ||
Field("new1", Type.String, Required), | ||
Field("yyy", Type.String, Required), | ||
Field("new2", Type.String, Required), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could be nice to switch new1
and new2
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I switch new1
and new2
in the input, then I'll also need to swtich new1
and new2
in the expected output. It's because the alphabetical ordering only happens when we call Field.normalize
, and that is not part of this test.
You have made me realize though that this test is not realistic! Because realistically we always call normalize
first, and mergeSchemas
afterwards. So to make it realistic, struct1
should start with fields vvv
, xxx
, zzz
, and the new added fields should be www
and yyy
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I switch new1 and new2 in the input, then I'll also need to swtich new1 and new2 in the expected output
Yes this is what I meant, to make sure that the "wrong" alphabetical order is preserved.
and that is not part of this test
Alright !
Because realistically we always call normalize first, and mergeSchemas afterwards
I'm very glad that you say that ! I was wondering in which case we were calling normalize
and in each case we were not. Shouldn't mergeSchemas
be in charge of calling normalize
then?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't
mergeSchemas
be in charge of callingnormalize
then?
Yeah that's a really good point.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@benjben I pushed a commit in which mergeSchemas
is in change of calling normalize
.
...but unfortunately I think I need to revert it. It's because of this comment
Must normalize first and add the schema key field after. To avoid unlikely weird issues with similar existing keys.
To be strictly correct under all circumstances, the order of operations must be:
- Normalize the field
- Add the
_schema_version
key. - Merge the fields.
While it is probably ok to mix up that order for most schemas, it is not guaranteed to be ok for all possible schemas. And I cannot think of a clever way to keep that order, while also making mergeSchemas
responsible for calling normalize
.
So on balance I think it is better to say we always normalize first and merge afterwards.
Maybe we can improve on this in future -- but this is stretching beyond what I'm trying to address in this current PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Couldn't addSchemaVersion
just be a boolean
parameter of mergeSchemas
?
Otherwise that's fine to leave it as is for now, at least we have considered the options.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It could yeah... this is the logic that we would have to migrate from common-streams over to schema-ddl. But it's a bit awkward because there is some stuff there which is "snowplow-aware" -- meaning, how we add the _schema_version
key to objects within a contexts array. Whereas schema-ddl currently doesn't know about snowplow details like how contexts are arrays.
Plus if we make any change to mergeSchemas
then we need to carry it over to rdb-loader, which is a bit fragile at the moment.
I do like the ideas to tidy-up the inter-dependencies between two libraries (schema-ddl and common-streams), but I'm going to skip it for this PR.
modules/core/src/main/scala/com.snowplowanalytics/iglu.schemaddl/parquet/Migrations.scala
Show resolved
Hide resolved
See snowplow/schema-ddl#213 When a schema is evolved (e.g. from `1-0-0` to `1-0-1`) we create a merged struct column combining fields from new and old schema. For some loaders it is important that newly-added nested fields come after the original fields. E.g. Lake Loader with Hudi and Glue sync enabled.
See snowplow/schema-ddl#213 When a schema is evolved (e.g. from `1-0-0` to `1-0-1`) we create a merged struct column combining fields from new and old schema. For some loaders it is important that newly-added nested fields come after the original fields. E.g. Lake Loader with Hudi and Glue sync enabled.
This overcomes a limitation with how Hudi syncs schemas to the Glue catalog. Previously, if version `1-0-0` of a schema had fields `a` and `b`, and then vesion `1-0-1` adds a field `c`, then the new field might be added _before_ the original fields in the Hudi schema. The new field would get synced to Glue, but only for new partitions; it is not back-filled to existing partitions. After this change, the new field `c` is added _after_ the original fields `a` and `b` in the Hudi schema. Then there is no need to sync the new field to existing partitions in Glue. The problem manifested in AWS Athena with a message like: > HIVE_PARTITION_SCHEMA_MISMATCH: There is a mismatch between the table and partition schemas. This fix was implemented in snowplow/schema-ddl#213 and snowplow-incubator/common-streams#98 and imported via a new version of common-streams. This change does not impact Delta or Iceberg, where nothing was broken.
This overcomes a limitation with how Hudi syncs schemas to the Glue catalog. Previously, if version `1-0-0` of a schema had fields `a` and `b`, and then vesion `1-0-1` adds a field `c`, then the new field might be added _before_ the original fields in the Hudi schema. The new field would get synced to Glue, but only for new partitions; it is not back-filled to existing partitions. After this change, the new field `c` is added _after_ the original fields `a` and `b` in the Hudi schema. Then there is no need to sync the new field to existing partitions in Glue. The problem manifested in AWS Athena with a message like: > HIVE_PARTITION_SCHEMA_MISMATCH: There is a mismatch between the table and partition schemas. This fix was implemented in snowplow/schema-ddl#213 and snowplow-incubator/common-streams#98 and imported via a new version of common-streams. This change does not impact Delta or Iceberg, where nothing was broken.
We use the
Migrations.mergeSchema
function in the loaders, for combining a series of schemas (e.g.1-0-0
,1-0-1
,1-0-2
) into a reconciled columnBefore this PR,
mergeSchemas
contained some logic to preserve field order... but it was the wrong logic. It preserved field order of the newer schema, ignoring whether a field was present in the older schema.After this PR, we preserve field order of the older schema. New fields added in a later schema are appended to the end of the field list.
This feature change is needed for loaders that only allow field additions when they are appended to the end of a struct. E.g. the Lake Loader for Hudi/Glue.